package com.atguigu.gmall.realtime.app.dwd.db;

import com.atguigu.gmall.realtime.app.BaseSQLApp;
import com.atguigu.gmall.realtime.commont.Constant;
import com.atguigu.gmall.realtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lzc
 * @Date 2023/4/26 09:57
 */
public class Dwd_08_DwdTradeRefundPaySuc extends BaseSQLApp {
    public static void main(String[] args) {
        new Dwd_08_DwdTradeRefundPaySuc().init(
            30008,
            2,
            "Dwd_08_DwdTradeRefundPaySuc"
        );
    }
    
    @Override
    protected void handle(StreamExecutionEnvironment env,
                          StreamTableEnvironment tEnv) {
        // 1. 读取 ods_db
        readOdsDb(tEnv, "Dwd_08_DwdTradeRefundPaySuc");
        // 2. 读取字典表
        readBaseDic(tEnv);
        // 3. 过滤过滤出退款成功信息
        Table refundPayment = tEnv.sqlQuery(
            "select " +
                "data['id'] id,  " +
                "data['order_id'] order_id,  " +
                "data['sku_id'] sku_id,  " +
                "data['payment_type'] payment_type,  " +
                "data['callback_time'] callback_time,  " +
                "data['total_amount'] total_amount,  " +
                "pt,  " +
                "ts  " +
                "from ods_db " +
                "where `database`='gmall2023' " +
                "and `table`='refund_payment' " +
                "and `type`='update' " +
                "and `old`['refund_status'] is not null  " +
                "and `data`['refund_status']='1602'  ");
        tEnv.createTemporaryView("refund_payment", refundPayment);
        // 4. 过滤退单明细数据
        Table orderRefundInfo = tEnv.sqlQuery(
            "select " +
                "data['order_id'] order_id,  " +
                "data['sku_id'] sku_id,  " +
                "data['refund_num'] refund_num,  " +
                "`old`  " +
                "from ods_db " +
                "where `database`='gmall2023' " +
                "and `table`='order_refund_info' " +
                "and `type`='update' " +
                "and `old`['refund_status'] is not null  " +
                "and `data`['refund_status']='0705'  ");
        tEnv.createTemporaryView("order_refund_info", orderRefundInfo);
        
        // 5. 过滤退款成功订单信息
        Table orderInfo = tEnv.sqlQuery(
            "select " +
                "data['id'] id,  " +
                "data['user_id'] user_id,  " +
                "data['province_id'] province_id,  " +
                "`old`  " +
                "from ods_db " +
                "where `database`='gmall2023' " +
                "and `table`='order_info' " +
                "and `type`='update' " +
                "and `old`['order_status'] is not null  " +
                "and `data`['order_status']='1006'  ");
        tEnv.createTemporaryView("order_info", orderInfo);
        // 6. join
        Table result = tEnv.sqlQuery(
            "select " +
                "rp.id,  " +
                "oi.user_id,  " +
                "rp.order_id,  " +
                "rp.sku_id,  " +
                "oi.province_id,  " +
                "rp.payment_type payment_type_code,  " +
                "dic.dic_name payment_type_name,  " +
                "date_format(rp.callback_time,'yyyy-MM-dd') date_id,  " +
                "rp.callback_time,  " +
                "ri.refund_num,  " +
                "rp.total_amount,  " +
                "rp.ts  " +
                "from refund_payment rp " +
                "join order_refund_info ri " +
                "on rp.order_id=ri.order_id and rp.sku_id=ri.sku_id " +
                "join order_info oi " +
                "on rp.order_id=oi.id " +
                "join base_dic for system_time as of rp.pt as dic " +
                "on rp.payment_type=dic.dic_code ");
        // 7. 写出
        tEnv.executeSql(
            "create table dwd_trade_refund_pay_suc(  " +
                "id string,  " +
                "user_id string,  " +
                "order_id string,  " +
                "sku_id string,  " +
                "province_id string,  " +
                "payment_type_code string,  " +
                "payment_type_name string,  " +
                "date_id string,  " +
                "callback_time string,  " +
                "refund_num string,  " +
                "refund_amount string,  " +
                "ts bigint   " +
                ")" + SQLUtil.getKafkaSinkDDL(Constant.TOPIC_DWD_TRADE_REFUND_PAY_SUC));
    
        result.executeInsert("dwd_trade_refund_pay_suc");
        
    }
}
/*
退款成功事务事实表
     退款表: 过滤出退款成功信息  update  1601->1602
     退单表: 过滤退单明细数据    update   -> 0705
     订单表: 过滤退款成功订单信息  update   ->1006
     字典表: 退化支付类型
     
     
     
 */